/**
 * +----------------------------------------------------------------------------------------------+
 * |Date               |  Version  |Author             |Description                              |
 * |==========+=======+==============+===================|
 * |2018年9月13日     |  1.0.0       | kreo                 |Initial                                       |
 * +----------------------------------------------------------------------------------------------+
 */
package main.java.utils.hxy.thread.thread;

import com.wanma.framework.util.IDate;
import com.wanma.framework.util.IKit;
import com.wanma.framework.util.IStr;
import com.wanma.framework.util.IUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
 * 任务线程池工厂类, 特点是根据Key去取得线程池 , 可设置每个线程池的大小 , 结束后直接关闭线程池
 *
 * @ClassName ITaskPoolFactory
 * @Description
 */
public class ITaskPoolFactory {
    private static Logger log = LoggerFactory.getLogger(ITaskPoolFactory.class);
    // 记录进度的Map , 记录标识和已执行数
    private static ConcurrentHashMap<String, CountDownLatch> progress = new ConcurrentHashMap<>();
    private static final int DEFAULT_POOL_SIZE = 8;

    /**
     * 使用随机uuid作为唯一标识 , 适用于isWait = false(可直接返回uuid的那种方法), 或者不需要进度跟踪的场景
     * 线程池默认为 8个线程
     *
     * @return
     */
    public static <T> String runnables(List<T> dataList, boolean withContext, boolean isWait, Consumer<T> consumer) {
        return runnables(null, dataList, DEFAULT_POOL_SIZE, withContext, isWait, consumer);
    }

    /**
     * 使用随机uuid作为唯一标识 , 适用于isWait = false(可直接返回uuid的那种方法), 或者不需要进度跟踪的场景
     *
     * @return
     */
    public static <T> String runnables(List<T> dataList, int poolSize, boolean withContext, boolean isWait, Consumer<T> consumer) {
        return runnables(null, dataList, poolSize, withContext, isWait, consumer);
    }

    /**
     * 创建任务型多线程
     *
     * @param uuid        唯一标识 , 可通过该标识查看任务进度 : getCount(uuid)方法可得到剩余任务数量
     * @param dataList       任务数据队列
     * @param poolSize    同时进行的任务数 (线程池大小)
     * @param withContext 是都在进程中带上context(用户登录信息)
     * @param isWait      是否阻断主线程等待所有任务结束
     * @param consumer    自定义消费者方法
     * @return
     */
    public static <T> String runnables(String uuid, List<T> dataList, int poolSize, boolean withContext, boolean isWait, Consumer<T> consumer) {
        if (IStr.isBlank(uuid) || progress.containsKey(uuid)) {
            uuid = IUtils.getUUID();
        }
        final String guid = uuid;
        try {
            if (!(dataList == null || dataList.size() == 0)) {
                ThreadPoolExecutor executor = new ThreadPoolExecutor(
                        poolSize,
                        poolSize,
                        15,
                        TimeUnit.SECONDS,
                        new TimeLinkedBlockingQueue<>(Integer.MAX_VALUE) // 超出初始线程后会存放在此处等待处理,如果queue容量不够,才会继续增大线程到最大线程
                        // 如果最大线程也不能处理.则直接抛错
                );
                log.debug("创建线程池>");
                executor.allowCoreThreadTimeOut(true);
                final CountDownLatch latch = new CountDownLatch(dataList.size());
                progress.put(guid, latch);
                for (T t : dataList) {
                    IRunnable<T> iRunnable = new IRunnable<T>().setWithContext(withContext).setData(t);
                    iRunnable.setExecute(consumer, latch);
                    executor.execute(iRunnable.runnable());
                }

                if (isWait) {
                    shutdown(guid, executor);
                } else {
                    IThreadPool.addTask(() -> shutdown(guid, executor));
                }
            }
        } finally {

        }
        return guid;
    }

    /**
     * 得到进程的剩余数量
     *
     * @param uuid
     * @return
     */
    public static Long getCount(String uuid) {
        if (progress.containsKey(uuid)) {
            return progress.get(uuid).getCount();
        } else {
            return 0L;
        }
    }

    private static void shutdown(String guid, ThreadPoolExecutor executor) {
        if (progress.containsKey(guid)) {
            CountDownLatch latch = progress.get(guid);
            try {
                latch.await();
            } catch (Exception e) {
                log.warn(guid + " > latch.await 失败 > ", e);
            }
        }
        IPool.shutdown(executor);
        progress.remove(guid);
        log.debug(">>>>>>>>>>>关闭了线程>>>>>>>>>>>");
    }

    public static void testCode(boolean withContent) {
        Random random = new Random();
        // int wait = random.nextInt(2000) + 1000;
        List<Integer> testLong = new ArrayList<>();
        for (int i = 0; i < 200; i++) {
            testLong.add(random.nextInt(2000) + 1000);
        }
        AtomicInteger atomicInteger = new AtomicInteger();
        ITaskPoolFactory.runnables(testLong, 10, withContent, true, p -> {
            long start = IDate.getNowMillis();
            int n = atomicInteger.getAndIncrement();
            try {
                System.out.println(">>> 第" + n + "个线程启动 : " + p + " >>> " + (withContent ? IKit.getUser() : ""));
                Thread.sleep(p);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                long exec = IDate.getNowMillis() - start;
                System.out.println("<<< 第" + n + "个线程关闭 : " + exec + " <<<");
            }
        });
        log.debug("执行完毕..");
    }


    public static void main(String[] args) {
        ITaskPoolFactory.testCode(false);
    }
}
